package ch.qos.logback.classic.log4j2;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OptionalDataException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import com.fasterxml.jackson.databind.JsonNode;
import com.networknt.config.Config;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.net.ReceiverBase;
import ch.qos.logback.classic.spi.LoggingEventVO;

public class UdpJsonReceiver extends ReceiverBase implements Runnable {
	private static LoggerContext lc = null;
	private static Logger logger = null;
	private int port = 5000;
	private volatile boolean active = true;
	private DatagramSocket datagramSocket;
	// max size so we only have to deal with one packet
	private final int maxBufferSize = 1024 * 65 + 1024;

	public void setPort(int port) {
		this.port = port;
	}

	@Override
	protected boolean shouldStart() {
		return true;
	}

	@Override
	protected void onStop() {
		this.active = false;
		addInfo("stop");
	}

	@Override
	protected Runnable getRunnableTask() {
		return this;
	}

	private void logEvent(String textEvent) {
		try {
			JsonNode json = Config.getInstance().getMapper().readTree(textEvent);
			LoggingEventVO event = LoggingEventVO.build(new LoggingEvent(json));
			Logger remoteLogger = lc.getLogger(event.getLoggerName());
			if (remoteLogger.isEnabledFor(event.getLevel())) {
				remoteLogger.callAppenders(event);
			}
		} catch (Exception e) {
			logger.warn("{}\n{}", e.getMessage(), textEvent, e);
		}
	}

	@Override
	public void run() {
		addInfo("udp port=" + port);
		try {
			this.datagramSocket = new DatagramSocket(port);
			lc = (LoggerContext) getContext();
			logger = lc.getLogger(getClass());
		} catch (SocketException e) {
			active = false;
			addError(e.getMessage(), e);
		}
		while (this.active) {
			if (datagramSocket.isClosed()) {
				// OK we're done.
				return;
			}
			try {
				final byte[] buf = new byte[maxBufferSize];
				final DatagramPacket packet = new DatagramPacket(buf, buf.length);
				datagramSocket.receive(packet);
				final ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData(), packet.getOffset(),
						packet.getLength());
				logEvents(bais);
			} catch (final OptionalDataException e) {
				if (datagramSocket.isClosed()) {
					// OK we're done.
					return;
				}
				logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
			} catch (final EOFException e) {
				if (datagramSocket.isClosed()) {
					// OK we're done.
					return;
				}
				logger.info("EOF encountered");
			} catch (final IOException e) {
				if (datagramSocket.isClosed()) {
					// OK we're done.
					return;
				}
				logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
			} catch (Exception e) {
				logger.error("Unable to parse log event", e);
			}
		}
		datagramSocket.close();
	}

	private final int bufferSize = 1024;

	private final Charset charset = StandardCharsets.UTF_8;

	public void logEvents(final InputStream inputStream) throws IOException {
		String workingText = "";
		try {
			// Allocate buffer once
			final byte[] buffer = new byte[bufferSize];
			String textRemains = workingText = "";
			while (true) {
				// Process until the stream is EOF.
				final int streamReadLength = inputStream.read(buffer);
				if (streamReadLength == END) {
					// The input stream is EOF
					break;
				}
				final String text = workingText = textRemains + new String(buffer, 0, streamReadLength, charset);
				int beginIndex = 0;
				while (true) {
					// Extract and log all XML events in the buffer
					final int[] pair = getEventIndices(text, beginIndex);
					final int eventStartMarkerIndex = pair[0];
					if (eventStartMarkerIndex < 0) {
						// No more events or partial XML only in the buffer.
						// Save the unprocessed string part
						textRemains = text.substring(beginIndex);
						break;
					}
					final int eventEndMarkerIndex = pair[1];
					if (eventEndMarkerIndex > 0) {
						final int eventEndXmlIndex = eventEndMarkerIndex + 1;
						final String textEvent = workingText = text.substring(eventStartMarkerIndex, eventEndXmlIndex);
						logEvent(textEvent);
						beginIndex = eventEndXmlIndex;
					} else {
						// No more events or partial XML only in the buffer.
						// Save the unprocessed string part
						textRemains = text.substring(beginIndex);
						break;
					}
				}
			}
		} catch (final IOException ex) {
			logger.error(workingText, ex);
		}
	}

	protected static final int END = -1;
	private static final int[] END_PAIR = new int[] { END, END };
	private static final char EVENT_END_MARKER = '}';
	private static final char EVENT_START_MARKER = '{';
	private static final char JSON_ESC = '\\';
	private static final char JSON_STR_DELIM = '\"';

	protected int[] getEventIndices(final String text, final int beginIndex) {
		// Scan the text for the end of the next JSON object.
		final int start = text.indexOf(EVENT_START_MARKER, beginIndex);
		if (start == END) {
			return END_PAIR;
		}
		final char[] charArray = text.toCharArray();
		int stack = 0;
		boolean inStr = false;
		boolean inEsc = false;
		for (int i = start; i < charArray.length; i++) {
			final char c = charArray[i];
			if (inEsc) {
				// Skip this char and continue
				inEsc = false;
			} else {
				switch (c) {
				case EVENT_START_MARKER:
					if (!inStr) {
						stack++;
					}
					break;
				case EVENT_END_MARKER:
					if (!inStr) {
						stack--;
					}
					break;
				case JSON_STR_DELIM:
					inStr = !inStr;
					break;
				case JSON_ESC:
					inEsc = true;
					break;
				}
				if (stack == 0) {
					return new int[] { start, i };
				}
			}
		}
		return END_PAIR;
	}
}
